{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Display locations of dense [AIS broadcast reports](https://marinecadastre.gov/ais/) at the port of Miami, FL.\n",
    "\n",
    "Package the application using:\n",
    "\n",
    "```bash\n",
    "mvn clean install\n",
    "```\n",
    "\n",
    "Start PySpark:\n",
    "\n",
    "```bash\n",
    "export PATH=${SPARK_HOME}/bin:${PATH}\n",
    "export SPARK_LOCAL_IP=localhost\n",
    "export PYSPARK_DRIVER_PYTHON=jupyter\n",
    "export PYSPARK_DRIVER_PYTHON_OPTS='lab --ip=0.0.0.0 --allow-root --no-browser --NotebookApp.token=\"\"'\n",
    "export PACKAGES=\"com.esri:filegdb:0.12.5\"\n",
    "pyspark\\\n",
    "  --master local[*]\\\n",
    "  --num-executors 1\\\n",
    "  --driver-memory 30G\\\n",
    "  --executor-memory 30G\\\n",
    "  --conf spark.ui.enabled=false\\\n",
    "  --packages ${PACKAGES}\\\n",
    "  --exclude-packages org.scala-lang:scala-reflect\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get a reference to the underlying JVM"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "jvm = spark._jvm"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get a reference to the `FileGDB` Scala object"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "gdb = jvm.com.esri.gdb.FileGDB"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Print the list of tables in the geo database:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "NameIndex(MiamiExtent,9)\n",
      "NameIndex(Voyage,10)\n",
      "NameIndex(Broadcast,11)\n",
      "NameIndex(Vessel,12)\n",
      "NameIndex(BaseStations,13)\n",
      "NameIndex(AttributeUnits,14)\n",
      "NameIndex(Extent,15)\n"
     ]
    }
   ],
   "source": [
    "gdb_path = os.path.join('data','Miami.gdb')\n",
    "tables = gdb.listTables(gdb_path)\n",
    "for t in tables:\n",
    "    print(t)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Print the schema of `Broadcast` (note how `x` and `y` are subfields to `Shape`)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- OBJECTID: integer (nullable = false)\n",
      " |-- Shape: struct (nullable = true)\n",
      " |    |-- x: double (nullable = true)\n",
      " |    |-- y: double (nullable = true)\n",
      " |-- SOG: integer (nullable = true)\n",
      " |-- COG: integer (nullable = true)\n",
      " |-- Heading: integer (nullable = true)\n",
      " |-- ROT: integer (nullable = true)\n",
      " |-- BaseDateTime: timestamp (nullable = true)\n",
      " |-- Status: integer (nullable = true)\n",
      " |-- VoyageID: integer (nullable = true)\n",
      " |-- MMSI: integer (nullable = true)\n",
      " |-- ReceiverType: string (nullable = true)\n",
      " |-- ReceiverID: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "schema = gdb.schema(gdb_path,'Broadcast')\n",
    "print(schema.treeString())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a Spark Dataframe and register it as a table for upcoming SQL operations"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read \\\n",
    "    .format(\"com.esri.gdb\") \\\n",
    "    .options(path=gdb_path, name=\"Broadcast\") \\\n",
    "    .load()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.registerTempTable(\"Broadcast\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create temp table `QR` by mapping a `Broadcast` event to a cell location.  The cell size is 0.001 degrees."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "cell_1 = 0.001\n",
    "cell_2 = cell_1 * 0.5\n",
    "sql(f\"select cast(floor(Shape.x/{cell_1}) as INT) as q,cast(floor(Shape.y/{cell_1}) as INT) as r from Broadcast\")\\\n",
    ".registerTempTable(\"QR\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Aggregate by cell and report back only the cells with more that 100 `Broadcast` events.\n",
    "Map each cell q/r to an x/y geo location and convert the Spark dataframe into a Pandas dataframe."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+-------+-----+\n",
      "|       x|      y|  pop|\n",
      "+--------+-------+-----+\n",
      "|-80.2505|25.8005|15780|\n",
      "|-80.1535|25.7655|  296|\n",
      "|-80.1405|25.7565| 2091|\n",
      "|-80.1155|25.7575|  159|\n",
      "|-80.1215|25.7585|  119|\n",
      "|-80.1685|25.7765| 1439|\n",
      "|-80.1835|25.7795|  381|\n",
      "|-80.1785|25.7725|  105|\n",
      "|-80.0925|25.7775|  112|\n",
      "|-80.1795|25.6945| 5439|\n",
      "|-80.0905|25.7955|  609|\n",
      "|-80.2165|25.7835|  280|\n",
      "|-80.0925|25.7985|  144|\n",
      "|-80.2165|25.7825| 3065|\n",
      "|-80.1005|25.7745| 3236|\n",
      "|-80.1755|25.7805|  150|\n",
      "|-80.1815|25.7845|  155|\n",
      "|-80.1395|25.7655|  182|\n",
      "|-80.1565|25.7665|  916|\n",
      "|-80.2445|25.7955| 5609|\n",
      "+--------+-------+-----+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sql(f\"select q*{cell_1}+{cell_2} as x,r*{cell_1}+{cell_2} as y,count(1) as pop from QR group by q,r having pop > 100\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a `dark-gray` map centered around Miami, FL."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# gis = GIS()\n",
    "# m = gis.map('Miami, FL')\n",
    "# m.basemap = 'dark-gray'\n",
    "# m"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Add to the map a layer whose content is imported from a Pandas dataframe."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# m.add_layer(gis.content.import_data(pdf))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
